package org.budo.warehouse.logic.bean;

import java.util.Map;

import javax.annotation.Resource;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.budo.druid.util.DruidUtil;
import org.budo.dubbo.protocol.async.config.BudoAsyncReferenceBean;
import org.budo.dubbo.protocol.async.config.BudoAsyncServiceBean;
import org.budo.dubbo.protocol.async.repository.AsyncRepository;
import org.budo.dubbo.protocol.async.repository.activemq.ActiveMQAsyncRepository;
import org.budo.dubbo.protocol.async.repository.kafka.KafkaAsyncRepository;
import org.budo.dubbo.protocol.async.repository.redis.RedisAsyncRepository;
import org.budo.jedis.pool.BudoJedisPool;
import org.budo.jedis.pool.factory.BudoJedisPoolFactory;
import org.budo.support.java.net.util.JavaNetUtil;
import org.budo.support.javax.sql.util.JdbcUtil;
import org.budo.support.lang.util.MapUtil;
import org.budo.support.lang.util.StringUtil;
import org.budo.support.spring.bean.factory.support.BeanBuilder;
import org.budo.support.spring.util.SpringUtil;
import org.budo.warehouse.logic.api.DataConsumer;
import org.budo.warehouse.logic.consumer.LogDataConsumer;
import org.budo.warehouse.logic.consumer.NullDataConsumer;
import org.budo.warehouse.logic.consumer.async.AsyncDataConsumerWrapper;
import org.budo.warehouse.logic.consumer.buffer.BufferedDataConsumerWrapper;
import org.budo.warehouse.logic.consumer.jdbc.JdbcDataConsumer;
import org.budo.warehouse.logic.consumer.jdbc.MysqlDataConsumer;
import org.budo.warehouse.logic.consumer.mail.MailDataConsumer;
import org.budo.warehouse.logic.consumer.mapping.FieldMappingDataConsumerWrapper;
import org.budo.warehouse.logic.consumer.webhook.WebhookDataConsumer;
import org.budo.warehouse.logic.consumer.webhook.DingtalkRobotDataConsumer;
import org.budo.warehouse.logic.producer.canal.WarehouseCanalMessageHandler;
import org.budo.warehouse.logic.util.PipelineUtil;
import org.budo.warehouse.service.api.IDataNodeService;
import org.budo.warehouse.service.entity.DataNode;
import org.budo.warehouse.service.entity.Pipeline;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * @author limingwei
 */
@Slf4j
@Component
public class LogicDynamicBeanProvider {
    @Resource
    private ApplicationContext applicationContext;

    @Resource
    private IDataNodeService dataNodeService;

    /**
     * 收消息
     */
    public void asyncDataProducer(DataNode dataNode, Pipeline pipeline) {
        String beanId = "asyncDataProducer-" + dataNode.getId() + "-" + pipeline.getId() + "-" + dataNode.getUrl();

        boolean containsBean = this.applicationContext.containsBean(beanId);
        if (containsBean) {
            log.warn("#212 bean exists, return, beanId=" + beanId);
            return;
        }

        String asyncRepositoryBeanName = this.asyncRepositoryName(dataNode);
        String destinationName = PipelineUtil.asyncDestinationName(pipeline); // 如果出现两个Pipeline的收消息的destinationName相同如何处理

        new BeanBuilder() //
                .id(beanId) //
                .type(BudoAsyncServiceBean.class) //
                .property("interfaceType", DataConsumer.class) //
                .property("asyncRepositoryBeanName", asyncRepositoryBeanName) //
                .property("destinationName", destinationName) //
                .ref("instance", "dispatcherDataConsumer") // 收到消息后由dispatcherDataConsumer处理
                .registerTo(this.applicationContext) //
                .get();

        log.info("#149 asyncRepositoryBeanName=" + asyncRepositoryBeanName);
    }

    public AsyncRepository asyncRepository(Pipeline pipeline) {
        DataNode targetDataNode = dataNodeService.findByIdCached(pipeline.getTargetDataNodeId());
        String asyncRepositoryName = this.asyncRepositoryName(targetDataNode);
        return (AsyncRepository) SpringUtil.getBean(asyncRepositoryName);
    }

    public void canalDataProducer(DataNode dataNode) {
        String canalMessageHandlerId = this.canalMessageHandlerId(dataNode);

        // 作为 destination, 作为 zk path, 的规则限制
        // BeanName as destination
        String beanId = PipelineUtil.canalDestinationName(dataNode);

        boolean containsBean = applicationContext.containsBean(beanId);
        if (containsBean) {
            log.warn("#103 bean exists, return, beanId=" + beanId + ", dataNode=" + dataNode);
            return;
        }

        String host = JdbcUtil.getHost(dataNode.getUrl());
        Integer port = JdbcUtil.getPort(dataNode.getUrl());
        port = null == port ? 3306 : port;

        // 检查端口是否可用
        if (!JavaNetUtil.isPortUsing(host, port, 2000, true)) {
            log.error("#109 canalDataProducer return, host=" + host + ", port=" + port + ", url=" + dataNode.getUrl() + ", dataNode.id=" + dataNode.getId());
            return;
        }

        String username = dataNode.getUsername();

        String password = dataNode.getPassword();
        password = null == password ? "" : password;
        password = DruidUtil.rsaDecrypt(password); // 密码解密

        // 检查数据库可达,账号正常
        boolean checkConnectMysql_2 = JdbcUtil.checkConnectMysql_2(host, port, username, password);
        if (!checkConnectMysql_2) {
            log.error("#115 checkConnectMysql_2 false, dataNode=" + dataNode);
            return;
        }

        new BeanBuilder() //
                .id(beanId) //
                .parent("abstractCanalInstance") //
                .property("slaveId", 1000 + dataNode.getId()) // 应该不会冲突吧，不会有1000多个从吧
                .property("host", host) //
                .property("port", port + "") //
                .property("username", username) //
                .property("password", password) //
                .ref("canalMessageHandler", canalMessageHandlerId) //
                .registerTo(this.applicationContext);
    }

    public DataConsumer dataConsumer(Pipeline pipeline) {
        DataConsumer targetDataConsumer = this.targetDataConsumer(pipeline); // 目标

        return this.wrapDataConsumer(pipeline, targetDataConsumer);
    }

    private DataConsumer targetDataConsumer(Pipeline pipeline) {
        Integer targetDataNodeId = pipeline.getTargetDataNodeId();
        DataNode targetDataNode = dataNodeService.findByIdCached(targetDataNodeId);
        if (null == targetDataNode) {
            log.error("#154 targetDataNode is null, targetDataNodeId=" + targetDataNodeId + ", pipeline=" + pipeline);
            return null;
        }

        String url = targetDataNode.getUrl();

        if (null == url || url.trim().isEmpty()) {
            throw new IllegalArgumentException("#49 url=" + url + ", targetDataNodeId=" + targetDataNodeId + ", pipelineId=" + pipeline.getId());
        }

        if (url.startsWith("jdbc:")) {
            return this.jdbcDataConsumer(pipeline);
        }

        if (url.startsWith("async:")) { // kafka:192.168.4.253:9092 activemq:tcp://192.168.4.251:61616
            return this.asyncDataConsumer(pipeline);
        }

        if (url.startsWith("mail")) { // 邮件警告
            return this.mailDataConsumer(pipeline);
        }

        if (url.startsWith("https://") || url.startsWith("http://")) { // 邮件警告
            return this.webhookDataConsumer(pipeline);
        }

        if (url.startsWith("log")) { // 邮件警告
            return new LogDataConsumer().setPipeline(pipeline);
        }

        // 排序最后
        if (url.startsWith("null")) { // 空的消费者
            return new NullDataConsumer();
        }

        throw new RuntimeException("#49 targetDataNode=" + targetDataNode);
    }

    private DataConsumer webhookDataConsumer(Pipeline pipeline) {
        Integer targetDataNodeId = pipeline.getTargetDataNodeId();
        DataNode targetDataNode = dataNodeService.findByIdCached(targetDataNodeId);

        String beanId = "WebhookDataConsumer-" + targetDataNode.getId() + "-" + targetDataNode.getUrl() + "-" + targetDataNode.getUsername() //
                + "-" + pipeline.getId() //
                + "-" + pipeline.getSourceSchema() + "-" + pipeline.getSourceTable() //
                + "-" + pipeline.getTargetSchema() + "-" + pipeline.getTargetTable();

        DataConsumer dataConsumer = SpringUtil.getBeanCached(this.applicationContext, beanId);
        if (null != dataConsumer) {
            return dataConsumer;
        }

        Class<?> type = WebhookDataConsumer.class;
        if (StringUtil.startWith(targetDataNode.getUrl(), "https://oapi.dingtalk.com/robot/send")) {
            type = DingtalkRobotDataConsumer.class;
        }

        return (DataConsumer) new BeanBuilder() //
                .id(beanId) //
                .type(type) //
                .propertyValue("pipeline", pipeline) //
                .registerTo(this.applicationContext) //
                .get();
    }

    private String canalMessageHandlerId(DataNode dataNode) {
        String beanId = "canalMessageHandler-" + dataNode.getId() + "-" + dataNode.getUrl();

        if (this.applicationContext.containsBean(beanId)) {
            log.warn("#81 bean exists, return, beanId=" + beanId);
            return beanId;
        }

        new BeanBuilder() //
                .id(beanId) //
                .type(WarehouseCanalMessageHandler.class) //
                .property("dataNodeId", dataNode.getId() + "") //
                .registerTo(this.applicationContext);

        return beanId;
    }

    private DataConsumer mailDataConsumer(Pipeline pipeline) {
        DataNode targetDataNode = dataNodeService.findByIdCached(pipeline.getTargetDataNodeId());

        String beanId = "MailDataConsumer-" + targetDataNode.getId() + "-" + targetDataNode.getUrl() + "-" + targetDataNode.getUsername() //
                + "-" + pipeline.getId() //
                + "-" + pipeline.getSourceSchema() + "-" + pipeline.getSourceTable() //
                + "-" + pipeline.getTargetSchema() + "-" + pipeline.getTargetTable();

        DataConsumer dataConsumer = SpringUtil.getBeanCached(this.applicationContext, beanId);
        if (null != dataConsumer) {
            return dataConsumer;
        }

        return (DataConsumer) new BeanBuilder() //
                .id(beanId) //
                .type(MailDataConsumer.class) //
                .propertyValue("pipeline", pipeline) //
                .registerTo(this.applicationContext) //
                .get();
    }

    private DataConsumer jdbcDataConsumer(Pipeline pipeline) {
        DataNode targetDataNode = dataNodeService.findByIdCached(pipeline.getTargetDataNodeId());

        Class<?> type = JdbcDataConsumer.class;
        if (targetDataNode.getUrl().startsWith("jdbc:mysql://")) {
            type = MysqlDataConsumer.class;
        }

        String beanId = type.getSimpleName() + "-" + targetDataNode.getUrl() + "-" + targetDataNode.getUsername() //
                + "-" + pipeline.getId() + "-" + pipeline.getSourceSchema() + "-" + pipeline.getSourceTable() //
                + "-" + pipeline.getTargetSchema() + "-" + pipeline.getTargetTable();

        DataConsumer dataConsumer = SpringUtil.getBeanCached(this.applicationContext, beanId);
        if (null != dataConsumer) {
            return dataConsumer;
        }

        return (DataConsumer) new BeanBuilder() //
                .id(beanId) //
                .type(type) //
                .propertyValue("pipeline", pipeline) //
                .registerTo(this.applicationContext) //
                .get();
    }

    private String asyncRepositoryName(DataNode dataNode) {
        if (dataNode.getUrl().startsWith("async:kafka://")) {
            return this.kafkaAsyncRepositoryName(dataNode);
        }

        if (dataNode.getUrl().startsWith("async:activemq://")) {
            return this.activemqAsyncRepositoryName(dataNode);
        }

        if (dataNode.getUrl().startsWith("async:redis://")) {
            return this.redisAsyncRepositoryName(dataNode);
        }

        throw new RuntimeException("#139 targetDataNode=" + dataNode);
    }

    private String redisAsyncRepositoryName(DataNode dataNode) {
        String beanId = "RedisAsyncRepository-" + dataNode.getId() + "-" + dataNode.getUrl();

        if (this.applicationContext.containsBean(beanId)) {
            log.warn("#260 bean exists, return, beanId=" + beanId + ", dataNode=" + dataNode);
            return beanId;
        }

        String host = JdbcUtil.getHost(dataNode.getUrl());

        Integer port = JdbcUtil.getPort(dataNode.getUrl());
        port = null == port ? 6379 : port;

        Integer database = JdbcUtil.getPort(dataNode.getUrl());
        database = null == database ? 0 : database;

        String password = dataNode.getPassword();
        password = null == password ? "" : password;
        password = DruidUtil.rsaDecrypt(password); // 密码解密

        BudoJedisPool budoJedisPool = new BudoJedisPoolFactory() //
                .setHost(host) //
                .setPort(port) //
                .setPassword(password) //
                .setDatabase(database) //
                .budoJedisPool();

        new BeanBuilder() //
                .id(beanId) //
                .type(RedisAsyncRepository.class) //
                .property("jedisPool", budoJedisPool) //
                .registerTo(this.applicationContext);

        return beanId;
    }

    private String kafkaAsyncRepositoryName(DataNode dataNode) {
        String beanId = "KafkaAsyncRepository-" + dataNode.getId() + "-" + dataNode.getUrl();

        if (this.applicationContext.containsBean(beanId)) {
            // log.warn("#299 bean exists, return, beanId=" + beanId + ", dataNode=" +
            // dataNode);
            return beanId;
        }

        String kafkaUrl = dataNode.getUrl().substring(14);
        int i = kafkaUrl.indexOf('?');
        if (i > 0) {
            kafkaUrl = kafkaUrl.substring(0, i); // 后面的参数不要
        }

        Map<String, Object> kafkaProperties = MapUtil.stringObjectMap("bootstrap.servers", kafkaUrl);

        new BeanBuilder() //
                .id(beanId) //
                .type(KafkaAsyncRepository.class) //
                .property("properties", kafkaProperties) //
                .registerTo(this.applicationContext);

        return beanId;
    }

    private String activemqAsyncRepositoryName(DataNode dataNode) {
        String beanId = "ActiveMQAsyncRepository-" + dataNode.getId() + "-" + dataNode.getUrl();

        if (this.applicationContext.containsBean(beanId)) {
            log.warn("#188 bean exists, return, beanId=" + beanId + ", dataNode=" + dataNode);
            return beanId;
        }

        String username = dataNode.getUsername();
        String password = dataNode.getPassword();
        password = DruidUtil.rsaDecrypt(password); // 密码解密

        String brokerUrl = "tcp://" + dataNode.getUrl().substring(17);
        int i = brokerUrl.indexOf('?');
        if (i > 0) {
            brokerUrl = brokerUrl.substring(0, i); // 后面的参数不要
        }

        new BeanBuilder() //
                .id(beanId) //
                .type(ActiveMQAsyncRepository.class) //
                .property("connectionFactory", new ActiveMQConnectionFactory(username, password, brokerUrl)) //
                .registerTo(this.applicationContext) //
                .get();

        return beanId;
    }

    /**
     * 发消息
     */
    private DataConsumer asyncDataConsumer(Pipeline pipeline) {
        DataNode targetDataNode = dataNodeService.findByIdCached(pipeline.getTargetDataNodeId());

        String beanId = "asyncDataConsumer-" + targetDataNode.getId() + "-" + targetDataNode.getUrl() //
                + "-" + pipeline.getId() + "-" + pipeline.getSourceTable() + "-" + pipeline.getTargetTable();

        DataConsumer dataConsumer = SpringUtil.getBeanCached(this.applicationContext, beanId);
        if (null != dataConsumer) {
            return dataConsumer;
        }

        String asyncRepositoryBeanName = this.asyncRepositoryName(targetDataNode);
        String destinationName = PipelineUtil.asyncDestinationName(pipeline);

        return (DataConsumer) new BeanBuilder() //
                .id(beanId) //
                .type(BudoAsyncReferenceBean.class) //
                .property("interfaceType", DataConsumer.class) //
                .property("asyncRepositoryBeanName", asyncRepositoryBeanName) //
                .property("destinationName", destinationName) //
                .registerTo(this.applicationContext) //
                .get();
    }

    private DataConsumer mappingConsumerWrapper(Pipeline pipeline, DataConsumer dataConsumer) {
        return this.consumerWrapper(pipeline, dataConsumer, FieldMappingDataConsumerWrapper.class);
    }

    private DataConsumer bufferedConsumerWrapper(Pipeline pipeline, DataConsumer dataConsumer) {
        return this.consumerWrapper(pipeline, dataConsumer, BufferedDataConsumerWrapper.class);
    }

    private DataConsumer asyncConsumerWrapper(Pipeline pipeline, DataConsumer dataConsumer) {
        return this.consumerWrapper(pipeline, dataConsumer, AsyncDataConsumerWrapper.class);
    }

    private DataConsumer consumerWrapper(Pipeline pipeline, DataConsumer dataConsumer, Class<?> wrapperType) {
        Integer targetDataNodeId = pipeline.getTargetDataNodeId();
        DataNode targetDataNode = dataNodeService.findByIdCached(targetDataNodeId);
        String url = targetDataNode.getUrl();

        if (null == url || url.trim().isEmpty()) {
            throw new IllegalArgumentException("#49 url=" + url + ", targetDataNodeId=" + targetDataNodeId + ", pipelineId=" + pipeline.getId());
        }

        String wrapperBeanId = wrapperType.getSimpleName() + "-" + targetDataNode.getId() + "-" + targetDataNode.getUrl() //
                + "-" + pipeline.getId() + "-" + pipeline.getSourceTable() + "-" + pipeline.getTargetTable();

        DataConsumer consumerWrapper = (DataConsumer) SpringUtil.getBean(wrapperBeanId);
        if (null != consumerWrapper) {
            return consumerWrapper;
        }

        return (DataConsumer) new BeanBuilder() //
                .id(wrapperBeanId) //
                .type(wrapperType) //
                .property("dataConsumer", dataConsumer) //
                .property("pipeline", pipeline) //
                .registerTo(this.applicationContext) //
                .get();
    }

    private DataConsumer wrapDataConsumer(Pipeline pipeline, DataConsumer targetDataConsumer) {
        if (null == targetDataConsumer) {
            log.error("#454 targetDataConsumer=" + targetDataConsumer + ", pipeline=" + pipeline);
            return null;
        }

        // 包装异步消息相关逻辑
        // 放在最后面，最后执行
        DataConsumer asyncConsumerWrapper = this.asyncConsumerWrapper(pipeline, targetDataConsumer);

        // 包装缓冲相关逻辑
        DataConsumer bufferedConsumerWrapper = this.bufferedConsumerWrapper(pipeline, asyncConsumerWrapper);

        // 包装定义变更列信息的逻辑
        // 放在最外面，最先执行
        DataConsumer mappingConsumerWrapper = this.mappingConsumerWrapper(pipeline, bufferedConsumerWrapper);
        return mappingConsumerWrapper;
    }
}